package com.watertek.bdcenter.bdcode.service.impl;

import com.watertek.bdcenter.bdcode.service.HiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

//@Transactional
@Service("hiveService")
public class HiveServiceImpl extends AbstractService implements HiveService {

    private static final Logger logger = LoggerFactory.getLogger(HiveServiceImpl.class);

    /**
     * 将源数据库的表结构复制到hive
     * @param sqoopBinFilePath
     * @param url
     * @param username
     * @param password
     * @param tableName
     * @param hiveDatabase
     * @param hiveTableName
     */
    @Override
    public void copyTableStructure(String sqoopBinFilePath, String url, String username, String password,
                                   String tableName, String hiveDatabase, String hiveTableName){
        logger.info("copyTableStructure method start");

        List<String> cmdList = new ArrayList<>();
        cmdList.add(sqoopBinFilePath); // sqoop命令的路径
        cmdList.add("create-hive-table");
        cmdList.add("--connect");
        cmdList.add(url);   // mysql的url
        cmdList.add("--username");  // 用户名
        cmdList.add(username);
        cmdList.add("--password");  // 密码
        cmdList.add(password);
        cmdList.add("--table");     // mysql的表名
        cmdList.add(tableName);
        cmdList.add("--hcatalog-database");     // hive数据库名
        cmdList.add(hiveDatabase);
        cmdList.add("--hcatalog-table");    // hive的表名
        cmdList.add(hiveTableName);
        cmdList.add("--create-hcatalog-table");
        cmdList.add("--hcatalog-storage-stanza");
        cmdList.add("\"stored as orc\"");
        String[] cmd = cmdList.toArray(new String[cmdList.size()]);
        try {
            Runtime.getRuntime().exec(cmd);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 将关系型数据的表导入hive中
     * @param sqoopBinFilePath
     * @param url
     * @param username
     * @param password
     * @param tableName
     * @param hiveDatabase
     * @param hiveTableName
     */
    @Override
    public void importHiveTable(String sqoopBinFilePath, String url, String username, String password,
                                String tableName, String hiveDatabase, String hiveTableName) {
        logger.info("importHiveTable method start");

        List<String> cmdList = new ArrayList<>();
        cmdList.add(sqoopBinFilePath); // sqoop命令的路径
        cmdList.add("import");
        cmdList.add("--connect");
        cmdList.add(url);   // mysql的url
        cmdList.add("--username");  // 用户名
        cmdList.add(username);
        cmdList.add("--password");  // 密码
        cmdList.add(password);
        cmdList.add("--table");     // mysql的表名
        cmdList.add(tableName);
        cmdList.add("--hive-database");     // hive数据库名
        cmdList.add(hiveDatabase);
        cmdList.add("--hive-import");
        cmdList.add("--hive-table");    // hive的表名，首先创建txt表，以_txt结尾
        cmdList.add(hiveTableName+HIVE_TEMP_TABLE_SUFFIX);
        cmdList.add("--fields-terminated-by");
        cmdList.add("','");
        cmdList.add("--hive-overwrite");

        String[] cmd = cmdList.toArray(new String[cmdList.size()]);
        try {
            Process process=Runtime.getRuntime().exec(cmd);
            process.waitFor();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 对指定的hive表进行手工编码
     * @param databaseName
     * @param tableName
     * @param id
     * @param bdCode
     */
    @Override
    public void manualCode(String databaseName, String tableName, String id, String bdCode) {
        hiveDao.manualCode(databaseName, tableName, id, bdCode);
    }

    /**
     * 根据tableName，获取hive表的元数据，并创建对应的orc表
     * @param tableName
     */
    @Override
    public void createOrcTable(String tableName) {
        logger.info("createOrcTable method start");

        try {
            // 获取元数据map，并添加bd_code字段
            Map<String, String> map=hiveDao.getMetadataMap(tableName+HIVE_TEMP_TABLE_SUFFIX);
            map.put("bd_code", "string");
            // 根据元数据创建orc表
            hiveDao.createOrcTable(tableName, map);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 存储元数据
     * @param tableName
     */
    @Override
    public void storeMetadata(String tableName) {
        Map<String, String> map = hiveDao.getMetadataMap(tableName);
        hiveDao.storeMetadata(tableName, map);
    }

    /**
     * 将originTable表中的数据插入到destTable表中
     * @param originTable
     * @param destTable
     * @param layer
     */
    @Override
    public void insertOrcTable(String originTable, String destTable, String layer) {
        hiveDao.insertOrcTable(originTable, destTable, layer);
    }

    /**
     * 删除表格
     * @param tableName
     */
    @Override
    public void dropTable(String tableName){
        hiveDao.dropTable(tableName);
    }

    /**
     * 查询表格中的记录
     * @param tableName
     * @return
     */
    @Override
    public List findAll(String tableName){
        return hiveDao.findAll(tableName);
    }

    /**
     * 根据id,查询表格中的某一条记录
     * @param tableName
     * @param id
     * @return
     */
    @Override
    public List findById(String tableName, String id){
        return hiveDao.findById(tableName, id);
    }

    /**
     * 根据北斗网格编码bdCode，查询表格中的某一条记录
     * @param tableName
     * @param bdCode
     * @return
     */
    public List findByBdCode(String tableName, String bdCode){
        return hiveDao.findByBdCode(tableName, bdCode);
    }

    /**
     * 对某个表进行分页查询
     * @param tableName
     * @param startNo
     * @param maxCount
     */
    public List findByPage(String tableName, int startNo, int maxCount){
        return hiveDao.findByPage(tableName, startNo, maxCount);
    }
}
